---
title: Observability with Callbacks
description: Learn how to implement comprehensive observability for Agent Squad using callbacks
---

import { Tabs, TabItem } from '@astrojs/starlight/components';

Agent Squad provides powerful observability capabilities through a comprehensive callback system that allows you to track, monitor, and analyze the behavior of your multi-agent system. This guide covers the callback system and demonstrates how to integrate with Langfuse for advanced observability.

## Callbacks System Overview

The Agent Squad framework implements three main types of callbacks to provide complete visibility into your system:

- **Agent Callbacks**: Track agent lifecycle, execution, and LLM interactions
- **Classifier Callbacks**: Monitor request classification and routing decisions
- **Tool Callbacks**: Observe tool usage and execution

### Agent Callbacks

Agent callbacks provide hooks into the agent execution lifecycle:

<Tabs syncKey="runtime">
  <TabItem label="Python" icon="seti:python">

```python
from agent_squad.agents import AgentCallbacks
from typing import Optional, Any, UUID

class CustomAgentCallbacks(AgentCallbacks):

    async def on_agent_start(
        self,
        agent_name: str,
        input: Any,
        messages: list[Any],
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> dict:
        """Called when an agent starts processing"""
        print(f"Agent {agent_name} starting with input: {input}")
        return {"start_time": time.time()}

    async def on_agent_end(
        self,
        agent_name: str,
        response: Any,
        messages: list[Any],
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when an agent completes processing"""
        print(f"Agent {agent_name} completed")

    async def on_llm_start(
        self,
        name: str,
        input: Any,
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when LLM processing starts"""
        print(f"LLM {name} starting")

    async def on_llm_end(
        self,
        name: str,
        output: Any,
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when LLM processing ends"""
        print(f"LLM {name} completed")

    async def on_llm_new_token(
        self,
        token: str,
        **kwargs: Any
    ) -> None:
        """Called for each new token in streaming responses"""
        print(f"New token: {token}")
```

  </TabItem>
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">

```typescript
// TypeScript callback implementation coming soon
```

  </TabItem>
</Tabs>

### Classifier Callbacks

Monitor request classification and routing decisions:

<Tabs syncKey="runtime">
  <TabItem label="Python" icon="seti:python">

```python
from agent_squad.classifiers import ClassifierCallbacks, ClassifierResult
from typing import Optional, Any, UUID

class CustomClassifierCallbacks(ClassifierCallbacks):

    async def on_classifier_start(
        self,
        name: str,
        input: Any,
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when classification starts"""
        print(f"Classifier {name} analyzing: {input}")

    async def on_classifier_stop(
        self,
        name: str,
        output: ClassifierResult,
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when classification completes"""
        selected_agent = output.selected_agent.name if output.selected_agent else "None"
        print(f"Classifier selected: {selected_agent} with confidence: {output.confidence}")
```

  </TabItem>
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">

```typescript
// TypeScript callback implementation coming soon
```

  </TabItem>
</Tabs>

### Tool Callbacks

Track tool execution and performance:

<Tabs syncKey="runtime">
  <TabItem label="Python" icon="seti:python">

```python
from agent_squad.utils import AgentToolCallbacks
from typing import Optional, Any, UUID

class CustomToolCallbacks(AgentToolCallbacks):

    async def on_tool_start(
        self,
        tool_name: str,
        input: Any,
        run_id: Optional[UUID] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when tool execution starts"""
        print(f"Tool {tool_name} executing with input: {input}")

    async def on_tool_end(
        self,
        tool_name: str,
        input: Any,
        output: dict,
        run_id: Optional[UUID] = None,
        **kwargs: Any,
    ) -> Any:
        """Called when tool execution completes"""
        print(f"Tool {tool_name} completed with output: {output}")
```

  </TabItem>
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">

```typescript
// TypeScript callback implementation coming soon
```

  </TabItem>
</Tabs>

## Langfuse Integration Demo

The [Langfuse demo](https://github.com/awslabs/agent-squad/tree/main/examples/langfuse-demo) provides a comprehensive example of implementing observability with Langfuse, a powerful open-source observability platform for LLM applications.

### Features Demonstrated

The Langfuse demo showcases:

- **Complete conversation tracing** - Track entire user sessions from start to finish
- **Agent classification monitoring** - See which agents are selected and why
- **LLM usage tracking** - Monitor token consumption, costs, and response times
- **Tool execution visibility** - Observe tool calls and their outcomes
- **Performance analytics** - Analyze bottlenecks and optimization opportunities

### Setup and Configuration

1. **Install Dependencies**:

```bash
cd examples/langfuse-demo
pip install -r requirements.txt
```

2. **Configure Environment**:

```bash
# .env file
LANGFUSE_PUBLIC_KEY=your_langfuse_public_key
LANGFUSE_SECRET_KEY=your_langfuse_secret_key
LANGFUSE_HOST=https://cloud.langfuse.com

AWS_ACCESS_KEY_ID=your_aws_access_key
AWS_SECRET_ACCESS_KEY=your_aws_secret_key
AWS_DEFAULT_REGION=your_aws_region
```

### Implementation Example

Here's how the Langfuse demo implements comprehensive observability:

<Tabs syncKey="runtime">
  <TabItem label="Python" icon="seti:python">

```python
from langfuse.decorators import observe, langfuse_context
from langfuse import Langfuse
from datetime import datetime, timezone

# Initialize Langfuse
langfuse = Langfuse()

class LangfuseAgentCallbacks(AgentCallbacks):

    async def on_agent_start(self, agent_name, payload_input, messages, **kwargs):
        """Track agent execution start"""
        langfuse_context.update_current_observation(
            input=payload_input,
            start_time=datetime.now(timezone.utc),
            name=agent_name,
            tags=kwargs.get('tags'),
            metadata=kwargs.get('metadata')
        )

    async def on_agent_end(self, agent_name, response, messages, **kwargs):
        """Track agent execution completion"""
        langfuse_context.update_current_observation(
            end_time=datetime.now(timezone.utc),
            name=agent_name,
            output=response,
            user_id=kwargs.get('user_id'),
            session_id=kwargs.get('session_id')
        )

    @observe(as_type='generation', capture_input=False)
    async def on_llm_end(self, name, output, **kwargs):
        """Track LLM generation with detailed metrics"""
        input_data = kwargs.get('payload_input', {})
        messages = [{'role': 'system', 'content': input_data.get('system')}]
        messages.extend(input_data.get('messages', []))

        langfuse_context.update_current_observation(
            name=name,
            input=messages,
            output=output,
            model=input_data.get('modelId'),
            model_parameters=kwargs.get('inferenceConfig'),
            usage={
                'input': kwargs.get('usage', {}).get('inputTokens'),
                'output': kwargs.get('usage', {}).get('outputTokens'),
                'total': kwargs.get('usage', {}).get('totalTokens')
            }
        )

class LangfuseClassifierCallbacks(ClassifierCallbacks):

    async def on_classifier_start(self, name, payload_input, **kwargs):
        """Track classification start"""
        inputs = [
            {'role': 'system', 'content': kwargs.get('system')},
            {'role': 'user', 'content': payload_input}
        ]
        langfuse_context.update_current_observation(
            name=name,
            start_time=datetime.now(timezone.utc),
            input=inputs,
            model=kwargs.get('modelId'),
            model_parameters=kwargs.get('inferenceConfig')
        )

    async def on_classifier_stop(self, name, output, **kwargs):
        """Track classification results"""
        langfuse_context.update_current_observation(
            output={
                'role': 'assistant',
                'content': {
                    'selected_agent': output.selected_agent.name if output.selected_agent else 'None',
                    'confidence': output.confidence
                }
            },
            end_time=datetime.now(timezone.utc),
            usage={
                'input': kwargs.get('usage', {}).get('inputTokens'),
                'output': kwargs.get('usage', {}).get('outputTokens'),
                'total': kwargs.get('usage', {}).get('totalTokens')
            }
        )

@observe(as_type="generation", name="conversation")
def run_conversation():
    """Main conversation loop with full tracing"""
    # Your orchestrator setup and conversation handling
    pass
```

  </TabItem>
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">

```typescript
// TypeScript Langfuse integration coming soon
```

  </TabItem>
</Tabs>

### Tracing Structure

The Langfuse integration creates a hierarchical trace structure:

```
Conversation (Generation)
├── Classification (Generation)
│   └── Classifier LLM Call (Generation)
├── Agent Processing (Span)
│   ├── Agent LLM Call (Generation)
│   └── Tool Calls (Spans)
│       ├── Tool Execution (Span)
│       └── Tool Results (Span)
└── Response Assembly (Span)
```

### Trace Visualization

Here's what a complete trace looks like in the Langfuse dashboard:

![Langfuse Trace Example](/agent-squad/langfuse-trace.png)

This trace shows the complete flow of a user request, including classification, agent selection, LLM calls, and tool execution, with detailed timing and token usage information.

### Analytics and Insights

With Langfuse integration, you can analyze:

- **Agent Usage Patterns**: Which agents are most frequently selected
- **Classification Accuracy**: How well the classifier routes requests
- **Performance Metrics**: Response times, token usage, and costs
- **Error Tracking**: Failed requests and their causes
- **User Behavior**: Session patterns and conversation flows

## Best Practices

### 1. Implement Comprehensive Callbacks

```python
class ProductionCallbacks(AgentCallbacks):
    def __init__(self, logger, metrics_client):
        self.logger = logger
        self.metrics = metrics_client

    async def on_agent_start(self, agent_name, **kwargs):
        # Log structured data
        self.logger.info("agent_start", extra={
            "agent_name": agent_name,
            "user_id": kwargs.get("user_id"),
            "session_id": kwargs.get("session_id")
        })

        # Send metrics
        self.metrics.increment("agent.invocations", tags=[f"agent:{agent_name}"])

    async def on_llm_end(self, name, output, **kwargs):
        # Track token usage
        usage = kwargs.get('usage', {})
        self.metrics.gauge("llm.tokens.input", usage.get('inputTokens', 0))
        self.metrics.gauge("llm.tokens.output", usage.get('outputTokens', 0))
```

### 2. Handle Errors Gracefully

```python
class RobustCallbacks(AgentCallbacks):
    async def on_agent_start(self, **kwargs):
        try:
            # Your observability logic
            pass
        except Exception as e:
            # Never let observability break your application
            logging.error(f"Callback error: {e}")
```

### 3. Use Sampling for High-Volume Applications

```python
import random

class SampledCallbacks(AgentCallbacks):
    def __init__(self, sample_rate=0.1):
        self.sample_rate = sample_rate

    async def on_agent_start(self, **kwargs):
        if random.random() < self.sample_rate:
            # Only trace a percentage of requests
            await self.full_trace(**kwargs)
```

### 4. Correlate Across Services

```python
class CorrelatedCallbacks(AgentCallbacks):
    async def on_agent_start(self, **kwargs):
        # Propagate trace context across service boundaries
        trace_id = kwargs.get('trace_id') or generate_trace_id()
        self.set_trace_context(trace_id)
```

## Integration with Other Observability Tools

The callback system is designed to work with various observability platforms:

### OpenTelemetry

```python
from opentelemetry import trace

class OTelCallbacks(AgentCallbacks):
    def __init__(self):
        self.tracer = trace.get_tracer(__name__)

    async def on_agent_start(self, agent_name, **kwargs):
        with self.tracer.start_as_current_span(f"agent_{agent_name}") as span:
            span.set_attribute("agent.name", agent_name)
            span.set_attribute("user.id", kwargs.get("user_id"))
```

### DataDog

```python
from ddtrace import tracer

class DataDogCallbacks(AgentCallbacks):
    async def on_agent_start(self, agent_name, **kwargs):
        with tracer.trace("agent.process", service="agent-squad") as span:
            span.set_tag("agent.name", agent_name)
            span.set_tag("user.id", kwargs.get("user_id"))
```

### Custom Metrics

```python
import time
from prometheus_client import Counter, Histogram

AGENT_INVOCATIONS = Counter('agent_invocations_total', 'Agent invocations', ['agent_name'])
AGENT_DURATION = Histogram('agent_duration_seconds', 'Agent processing time', ['agent_name'])

class MetricsCallbacks(AgentCallbacks):
    async def on_agent_start(self, agent_name, **kwargs):
        AGENT_INVOCATIONS.labels(agent_name=agent_name).inc()
        kwargs['start_time'] = time.time()

    async def on_agent_end(self, agent_name, **kwargs):
        duration = time.time() - kwargs.get('start_time', 0)
        AGENT_DURATION.labels(agent_name=agent_name).observe(duration)
```

## Running the Langfuse Demo

To see the observability system in action:

1. **Start the demo**:
```bash
cd examples/langfuse-demo
python main.py
```

2. **Interact with the system**:
```
You: What's the weather in San Francisco?
You: Tell me about AI trends
You: How to improve my sleep?
```

3. **View traces in Langfuse**:
   - Navigate to your Langfuse dashboard
   - Explore conversation traces
   - Analyze agent selection patterns
   - Monitor performance metrics

The demo provides a complete template for implementing production-ready observability in your Agent Squad applications.
